[HUDI-2207] Support independent flink hudi clustering function#3599
[HUDI-2207] Support independent flink hudi clustering function#3599yuzhaojing merged 1 commit intoapache:masterfrom
Conversation
c795bbe to
9443126
Compare
2828a67 to
1bb7f93
Compare
There was a problem hiding this comment.
I guess the unit is MB ?
| */ | ||
| public class FlinkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>> | ||
| extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { | ||
| private static final Logger LOG = LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class); |
There was a problem hiding this comment.
Should we extend from FlinkSizeBasedClusteringPlanStrategy instead ?
| if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) { | ||
| throw new HoodieClusteringException("Clustering failed to write to files:" | ||
| + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); | ||
| } |
There was a problem hiding this comment.
writeTableMetadata is missing.
|
|
||
| private static final Logger LOG = LogManager.getLogger(FlinkClusteringPlanActionExecutor.class); | ||
|
|
||
| public FlinkClusteringPlanActionExecutor(HoodieEngineContext context, |
There was a problem hiding this comment.
We can use the HoodieData to merge the code with SparkClusteringPlanActionExecutor, this can be done with separate following PR.
| .key("clustering.schedule.enabled") | ||
| .booleanType() | ||
| .defaultValue(false) // default false for pipeline | ||
| .withDescription("Async clustering, default false for pipeline"); |
There was a problem hiding this comment.
Schedule the compaction plan, default false
| .key("clustering.tasks") | ||
| .intType() | ||
| .defaultValue(10) | ||
| .withDescription("Parallelism of tasks that do actual clustering, default is 10"); |
There was a problem hiding this comment.
Change the default value same with compaction.tasks, which is 4.
| * The clustering task identifier. | ||
| */ | ||
| private int taskID; | ||
|
|
There was a problem hiding this comment.
The event should include a fileId to deduplicate for tasks failover/retry. Take CompactionCommitEvent as a reference. Because there are multiple input file ids for a HoodieClusteringGroup thus the CompactionCommitEvent, we can use the first file group id to distinguish.
| this.schema = new Schema.Parser().parse(writeConfig.getSchema()); | ||
| this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema); | ||
| this.requiredPos = getRequiredPositions(); | ||
|
|
There was a problem hiding this comment.
Where is the requiredPos used for ?
| for (ClusteringOperation clusteringOp : clusteringOps) { | ||
| try { | ||
| Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())); | ||
| HoodieFileReader<? extends IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); |
| private void doClustering(String instantTime, HoodieClusteringGroup clusteringGroup, Collector<ClusteringCommitEvent> collector) throws IOException { | ||
| List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList()); | ||
| boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0); | ||
|
|
There was a problem hiding this comment.
The HoodieClusteringGroup has num of output file groups, the current code has only one file group (or more if the parquet size hits the threshold), can we find a way to set up the parallelism of bulk_insert writer as that ?
| conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions); | ||
| if (config.sortColumns != null) { | ||
| conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns); | ||
| } |
There was a problem hiding this comment.
Where is the CLUSTERING_SORT_COLUMNS used for ?
vinothchandar
left a comment
There was a problem hiding this comment.
cc @yihua wondering if we can reuse a lot more code here?
Yes, the core clustering action should be extracted out, independent of engines, using |
f86df2e to
d3eb4e3
Compare
|
@hudi-bot run azure |
|
Hello, there seems come conflicts and the compile failure, can you fix that @yuzhaojing ? |
|
Sure, I will fix this. |
|
@yuzhaojing The refactoring of clustering action is done in #4847, in a way to have engine-agnostic clustering logic in |
e5838e1 to
777c0ac
Compare
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file |
There was a problem hiding this comment.
This clazz can be removed.
| this.rowType = rowType; | ||
| this.gComputer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer"); | ||
| this.gComparator = createSortCodeGenerator().generateRecordComparator("SortComparator"); | ||
| } |
There was a problem hiding this comment.
Maybe we can make gComputer and gComparator local variables.
| * @param record Generic record. | ||
| * @param columns Names of the columns to get values. | ||
| * @return Column value if a single column, or concatenated String values by comma. | ||
| */ |
There was a problem hiding this comment.
Does this change still necessary ?
|
hi @yuzhaojing rebase thise pr. the CI's error is fixed. |
e1689c4 to
cf4355d
Compare
This comment was marked as resolved.
This comment was marked as resolved.
eb878d4 to
9f98d28
Compare
|
@hudi-bot run azure |
|
@hudi-bot run azure |
packaging/hudi-flink-bundle/pom.xml
Outdated
| <artifactId>flink-avro</artifactId> | ||
| <version>${flink.version}</version> | ||
| <scope>compile</scope> | ||
| </dependency> |
There was a problem hiding this comment.
HoodieClusteringGroup is a avro model in ClusteringPlanEvent.
There was a problem hiding this comment.
But you should not depend on flink-avro i guess, there are already many model clazzes in hudi-flink code paths.
There was a problem hiding this comment.
fixed. use ClusteringGroupInfo instead of HoodieClusteringGroup.
| <groupId>org.apache.flink</groupId> | ||
| <artifactId>flink-avro</artifactId> | ||
| <version>${flink.version}</version> | ||
| <scope>provided</scope> |
|
We may need to resolve the conflict. |
fixed. |
|
|
||
| private void updateTableMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, | ||
| HoodieCommitMetadata commitMetadata, | ||
| HoodieInstant hoodieInstant) { |
There was a problem hiding this comment.
updateTableMetadat seems useless now ~
danny0405
left a comment
There was a problem hiding this comment.
+1, thanks for contribution @yuzhaojing , we may need to fix the clustering plan scheduling issue in the following PR.
Sure, I will fix the issue in following PR to support clustering plan scheduling in coordinator. |
|
Thanks for suggest, Danny! |
Tips
What is the purpose of the pull request
(For example: This pull request adds quick-start document.)
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.